package com.atguigu.function;

import com.atguigu.pojo.Event;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.TimeUtils;

import java.util.concurrent.TimeUnit;

public class EventSourceFunction implements SourceFunction<Event> {

    String [] users = {"Zhangsan","Lisi","Wangwu","Tom","Jerry"};

    String [] urls = { "/home","/list","/detail","/pay","/cart"};

    public void run (SourceContext<Event> ctx) throws Exception{
        while(true){
            String user = users[RandomUtils.nextInt(0,users.length)];

            String url = urls[RandomUtils.nextInt(0,urls.length)];

            Long ts = System.currentTimeMillis();

            ctx.collect(new Event(user ,url ,ts));

            //休眠1秒
            TimeUnit.SECONDS.sleep(1);


        }
    }

    @Override
    public void cancel() {

    }
}
